0%

从Mpsc到RingBuffer(二)- RingBuffer

前言

前面我们聊完了Mpsc,在提一下,Mpsc主要是针对的单消费者多生产者的情况。

对于消费者而言,因为只有一个消费者,所以不需要任何同步。

对于生产者而言,为了防止多线程下会出现问题,所以使用CAS操作。

但是上一篇文章中Mpsc使用的是链表的结构,不加以控制容易OOM。

为了避免这个问题,我们可以使用数组来作为底层存储。

原理其实和这边文章的RingBuffer讲的类似。

这里的RingBuffer其实就是Disruptor的实现,不过我单独抽成了一个文章来讲Disruptor的源码。

这里就划一些示意图,解释RingBuffer的原理。

位置的二阶段写入

想象一下,如果我们有一个数组,我们要添加一个元素,单线程写入

1
2
3
public void put(T obj) {
arr[index++] = obj;
}

这样当然是没有问题的,如果是多线程呢,这样就会出现问题,因为Index++不是原子操作。

可能两个线程进入后,写入的其实是同一个位置的元素。

那怎么办呢?

我们可以简单的加锁

1
2
3
public synchronized void put(T obj) {
arr[index++] = obj;
}

但是这样性能就会下降很多。

有没有其他的办法,就要借助简单的CAS就可以实现呢?

前面提到,问题在于Index++不是原子的操作,那么我们将Index++这个操作,变成原子的不就行了。

1
2
3
4
public void put(T obj) {
int position = index.inc(); //inc假设是原子操作 // 1
arr[position] = obj; // 2
}

这样就行了。

这里我们把操作分为了两步

  1. 第一步,占位,这个操作是原子的
  2. 第二步,写入元素

RingBuffer的写入

image-20200826221709219

有了上述的两阶段写入的基础,对于RingBuffer的写入就好理解了。

假设我们的写入偏移指针指向arr[1]

这个时候,有一个Producer想要写入一个数据咋办。

我们CAS这个Cursor,获取下一个位置的写入权限

伪代码如下:

1
2
3
4
5
6
7
8
9
10
public int nextPosition() {
for (;;) {
int currentIndex = current;
boolean success = unsafe.cas(address("currentIndex"), currentIndex, currentIndex + 1);
if (success) {
return currentIndex + 1;
break;
}
}
}

获取成功后,往这个位置写入数据就行。

image-20200826221709219

如果这个生产者同时有N个元素需要写入,那我们就直接申请下面N个位置的权限,然后依次写入就行。

代码上只要修改CAS的第三个参数就行。

所以,对于写入而言,多线程写入,只要CAS这个写入的偏移指针,先获取写入的位置信息,下面再塞入数据,可以极大的避免Lock。

image-20200826221709219

如图所示,显示有2个线程同时在写入数据。

注意,这里存在一种状态,数据还未被完全写入成功,Write Cursor之前的格子里并不是全部都有数据了。

RingBuffer读取

对于RingBuffer的读取,其实也比较简单,也是一个二阶段的读取。

先对readIndex,进行CAS的加n。

成功后,读取返回的值的Position的值。

但是这里注意一个问题:

  1. 返回的Position位置的值可能还未被写入,所以读取的可能是个空。

怎么解决这个问题呢?

其实也不算是个问题,反复读取几遍,直到不为空就行。

但是在Disruptor中,使用了完全不同的做法,和我的方法略有不同。在下一篇文章中会讲。

ReadIndex和WriteIndex的冲突

从上面的文章,我们可以理解为没有冲突,也就是这个数组的长度是无限长的。

所以我们仅仅维护了一个WriteIndex。

但是在实际的情况中,数组的长度都固定的,到了末尾之后就要从头开始写。

所以这里仅仅维护一个WriteIndex是不够的,还需要维护一个ReadIndex。

使用这两个Index来判断数组是否是空的或者已经满了。

参考文章

https://www.jianshu.com/p/297819b95770

https://juejin.im/post/6844903840156745742